Akka StreamsでSQSのメッセージを処理するワーカーを構築する
こんにちは、Akka Streamsに可能性を感じている山崎です。
今回はAkka StreamsでSQSからメッセージを取得し、処理を行うワーカーを構築する方法をご紹介します。
SQSとは
AWSが提供する分散メッセージキューサービスです。SQSを使用することでシステム間のメッセージ送信が疎結合になり、スケーラビリティの高いシステムを構築することができます。
SQSのFAQでは、動画トランスコーディングのリクエストを受け取った際、一旦SQSに動画のトランスコーディングを行うためのメッセージを投入し、複数台のEC2インスタンス上で実行されているジョブワーカーがSQSからメッセージを取得して実際にトランスコーディング処理を実行する、という例が紹介されています。
SQSについての解説はこちらに、SQS+ジョブワーカーという構成についてはこちらにまとまっていますのでご覧ください。
実装してみる
取得したメッセージのボディのJsonをデコードして標準出力に出力する、というシンプルな例を以下の4ステップに分けて実装していきます。
- SQSからメッセージを取得する
Source
- SQS上のメッセージを削除する
Sink
- メッセージを受け取りジョブを実行する
Flow
- つなげて実行する部分
実際にコードを見ていきましょう。
事前準備
依存ライブラリ
使用するライブラリをbuild.sbtに記述します。
libraryDependencies ++= Seq( "com.amazonaws" % "aws-java-sdk-sqs" % "1.11.205", "com.typesafe.akka" %% "akka-stream" % "2.5.6", "com.typesafe.akka" %% "akka-stream-testkit" % "2.5.6" % Test, "com.lightbend.akka" %% "akka-stream-alpakka-sqs" % "0.13", "io.circe" %% "circe-core" % "0.8.0", "io.circe" %% "circe-generic" % "0.8.0", "io.circe" %% "circe-parser" % "0.8.0" )
implicit
これから実装していくコードで必要なimplicitな値を先に定義しておきます。
implicit val sqsClient: AmazonSQSAsync = AmazonSQSAsyncClientBuilder .standard() .build() implicit val system: ActorSystem = ActorSystem() implicit val ec: ExecutionContext = system.dispatcher implicit val mat: ActorMaterializer = ActorMaterializer()
SQSからメッセージを取得するSource
引数に渡したURLのキューからメッセージを取得します。
val sqsSource: Source[Message, NotUsed] = SqsSource(queueUrl)
このSourceが下流に流す要素の型Message
はAWS SDKによって定義されているものです。
SQS上のメッセージを削除するSink
処理が終わったSQS上のメッセージを削除します。
val sqsAckSink: Sink[(Message, MessageAction), Future[Done]] = SqsAckSink(queueUrl)
このSinkに、メッセージと行いたい操作の組を流し込むことで、SQS上の当該メッセージに対して削除などの操作を行うことができます。
メッセージに対して行う操作を表すMessageAction
は以下の三種類が存在します。
Delete()
SQS上からメッセージを削除します。-
Ignore()
SQS上のメッセージに対して何も行いません。(可視性タイムアウトの後、再びメッセージが取得できる状態となります) -
ChangeMessageVisibility(visibilityTimeout: Int)
可視性タイムアウトを指定された秒数分延長します。
メッセージを受け取りジョブを実行するFlow
Messageの処理を行う部分を実装していきます。
メッセージを受け取って処理を行う部分のコードは以下です。
val messageProcessingFlow = Flow[Message].mapAsyncUnordered(10) { message => processMessage(message).map(action => (message, action)) }
mapAsyncUnorderd
を使うことでそれぞれのメッセージに対しての処理を並列に行っています。
メッセージを受け取ったときに何をするのかを実装している部分は以下のコードです。
def processMessage(message: Message): Future[MessageAction] = { def decoded = Future.fromTry(decodeMessage(message)) def processed = for { entity <- decoded _ <- processEntity(entity) } yield () processed .map{ _ => Delete() } .recover { case _ => Ignore() } } /** * 送られてきたメッセージをEntityクラスのインスタンスに変換します */ def decodeMessage(message: Message): Try[Entity] = { import io.circe._ import io.circe.generic.auto._ parser.decode[Entity](message.getBody).toTry } def processEntity(entity: Entity) = Future { /* 今回はダミーの処理としてメッセージの内容をコンソールに出力する */ println(entity) }
processMessage()
の中で、処理の結果をSQS上のメッセージに対する操作に変換しています。処理が成功した際にはDelete()
を返しSQS上からメッセージが削除されるように、処理が失敗した際にはIgnore()
を返し、後で再びメッセージが処理されるようにしています。
つなげて実行
なんらかの理由で処理が終了してしまった際に自動的に再び実行されるようにするようにするため、実行する際にも少し工夫をします。
Akka StreamsにはRestartSource
、RestartSink
、RestartFlow
というユーティリティが用意されており、これらを使用するとエクスポネンシャル・バックオフアルゴリズムを使用して順番に間隔を伸ばしながら再実行してくれます。これを使用して実行したのが以下です。
val graphToRun: RunnableGraph[Future[Done]] = sqsSource.via(messageProcessingFlow).toMat(sqsAckSink)(Keep.right) RestartSource.withBackoff( minBackoff = 3.seconds, maxBackoff = 60.seconds, randomFactor = 0.2 ) { () => Source.fromFuture(graphToRun.run()) } .runWith(Sink.ignore)
一回の実行の結果を表すFutureをSourceに変換し、それに対してRestartSource
を適用しています。
これにより、処理が終了してしまった際に処理が再び実行されます。
実際に実行して試してみる
それでは実際に実行して、動作を見てみましょう。今回はSQSの代わりにElasticMQを使用して動作を確認します。
ローカルのDocker上でElasticMQを実行しておきます。また、別プロセスでElasticMQにメッセージを送信し続ける処理を走らせておきます。
この状態で今回実装したワーカーを起動すると
このようにElasticMQからメッセージを取得して処理が実行されました。
まとめ
今回は、Akka Streamsを使用してSQSからメッセージを取得して処理を行うワーカーを実装する方法をご紹介しました。Akka Streamsによって、ワーカーが宣言的な記述により実装できたのがお分かりいただけたと思います。
Alpakkaには、他にもKinesisやElastic SearchをAkka Streamsのコンポーネントとして扱うためのコネクターもあり、調べれば調べるほどAkka Streamsを使ってできることは多いという印象を強く受けます。今後もこのブログで実際の処理の例を記事にして紹介していけたらと思っています。